package AkkaD

import akka.actor.{Actor, ActorRef, ActorSystem, Props}
import com.typesafe.config.{Config, ConfigFactory}

import scala.collection.mutable
import scala.concurrent.duration._


class Master(val host:String,val port:Int) extends Actor{

  val CHECK_INTERVAL = 15000

  //改造第十三步:key是id,value是WorkerInfo
  val id2Worker = new mutable.HashMap[String,WorkerInfo]()
  val workers = new mutable.HashSet[WorkerInfo]()   //为了方便,定义一个HashSet


  //改造第二步:在Master构造方法执行之后,Receive方法之前,要启动一个定时器
  override def preStart(): Unit = {
    //启动一个定时器
    import context.dispatcher   //很重要的作用:schedule方法需要一个隐式参数ExecutionContext

    //其中的检查消息是Master内部的一个消息,相当于发给自己的一个消息,拿到自己的引用,使用self
    context.system.scheduler.schedule(0 millis,CHECK_INTERVAL millis,self,CheckTimeOutWorker)

  }

  //源码:Receive是一个偏函数
  //接收要处理的消息,会被调用多次,每一个Actor都会调用
  override def receive:Receive = {

    //改造第十步:接收Worker发来的注册消息
    case RegisterWorker(workerId, memory, cores) => {
      //改造第十一步:将Worker的信息封装保存起来=>定义一个新的类WorkerInfo.class
      val workerInfo: WorkerInfo = new WorkerInfo(workerId, memory, cores)
      //保存到集合中(对应改造第十三步)
      id2Worker(workerId) = workerInfo //参考Scala语法总结16,更新映射中的值
      //id2Worker.put(workerId,workerInfo)  //参考Scala语法总结16,向可变映射中添加元素
      //id2Worker += (workerId -> workerInfo) //Scala语法总结16,使用+=添加多个元素
      //id2Worker += {(workerId,workerInfo)}  //添加对偶元组
      workers += workerInfo //为了方便,放到一个HashSet集合中
      //更新:通知worker注册
      sender ! RegisteredWorker(s"akka.tcp://${Master.MASTER_ACTOR_SYSTEM_NAME}@$host:$port/user/${Master.MASTER_ACTOR_NAME}")
    }

      //接收已经在Master注册过的worker定时发送来的心跳
    case HeartBeat(workerId) => {
      val workerInfo = id2Worker(workerId)
      //更新:报告活着 把该worker给master心跳的时间记录下来
      val current = System.currentTimeMillis()
      workerInfo.lastHeartbeatTime = current
    }

    //改造第三步:Master发送给自己的一个内部检测消息
    case CheckTimeOutWorker =>{
      println("internal msg CheckTimeOutWorker")

      //改造第十四步:Master检测Worker的超时时间
      val current = System.currentTimeMillis()
      //过滤掉超时的workers
      val deadWorkers = workers.filter(w => current - w.lastHeartbeatTime > CHECK_INTERVAL)
            for (workerInfo <- deadWorkers){
              id2Worker -= workerInfo.id
              workers -= workerInfo
            }
//      deadWorkers.foreach(workerInfo => {
//        id2Worker -= workerInfo.id
//        workers -= workerInfo
//      })

      println(workers.size)
    }




//    case "Register" =>
//        println("a register msg from worker")
//      //第八步:Master收到Worker发来的消息后,再给消息的发送者Worker返回一个消息
//      //此时Master也拿到了一个Worker的代理对象,sender()可以拿到消息发送者的引用
//      sender ! "Response"
//
//    case "HeartBeat" =>
//
//
//    case "CheckTimeOut" =>
//      println("internal msg check")
  }
}

object Master{

  //改造第一步:抽取变量
  //Java中的public static final = Scala中的val
  val MASTER_ACTOR_SYSTEM_NAME = "MasterSystem"
  val MASTER_ACTOR_NAME = "Master"

  def main(args: Array[String]): Unit = {

    val host = args(0)
    val port = args(1).toInt
    val configStr =
      s"""
        |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
        |akka.remote.netty.tcp.hostname = "$host"
        |akka.remote.netty.tcp.port = "$port"
      """.stripMargin

    val config: Config = ConfigFactory.parseString(configStr)
    //第一步:创建ActorSystem(单例)
    val actorSystem: ActorSystem = ActorSystem(MASTER_ACTOR_SYSTEM_NAME,config)

    //第二步:通过ActorSystem创建Actor
    //Props[]中可以传N个Actor,此次传一个Master的class;取名字为MasterA
    val masterActor: ActorRef = actorSystem.actorOf(Props(new Master(host,port)),MASTER_ACTOR_NAME)

    //第三步:发送消息
    // ! 代表是异步消息
    //masterActor ! "CheckTimeOut"

  }
}
